{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Input pipeline into Keras\n",
    "\n",
    "In this notebook, we will look at how to read large datasets, datasets that may not fit into memory, using TensorFlow. We can use the tf.data pipeline to feed data to Keras models that use a TensorFlow backend.\n",
    "\n",
    "## Learning Objectives\n",
    "1. Use tf.data to read CSV files\n",
    "2. Load the training data into memory\n",
    "3. Prune the data by removing columns\n",
    "4. Use tf.data to map features and labels\n",
    "5. Adjust the batch size of our dataset\n",
    "6. Shuffle the dataset to optimize for deep learning\n",
    "\n",
    "Each learning objective will correspond to a __#TODO__ in the [student lab notebook](../labs/input_pipeline.ipynb) -- try to complete that notebook first before reviewing this solution notebook. \n",
    "\n",
    "Let's start off with the Python imports that we need."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%bash\n",
    "export PROJECT=$(gcloud config list project --format \"value(core.project)\")\n",
    "echo \"Your current GCP Project Name is: \"$PROJECT"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "!pip install tensorflow==2.1.0 --user"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Let's make sure we install the necessary version of tensorflow. After doing the pip install above, click __Restart the kernel__ on the notebook so that the Python environment picks up the new packages."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 19,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "TensorFlow version:  2.1.0\n"
     ]
    }
   ],
   "source": [
    "import os, json, math\n",
    "import numpy as np\n",
    "import shutil\n",
    "import logging\n",
    "# SET TF ERROR LOG VERBOSITY\n",
    "logging.getLogger(\"tensorflow\").setLevel(logging.ERROR)\n",
    "import tensorflow as tf\n",
    "\n",
    "print(\"TensorFlow version: \",tf.version.VERSION)\n",
    "\n",
    "PROJECT = \"your-gcp-project-here\" # REPLACE WITH YOUR PROJECT NAME\n",
    "REGION = \"us-central1\" # REPLACE WITH YOUR BUCKET REGION e.g. us-central1\n",
    "\n",
    "# Do not change these\n",
    "os.environ[\"PROJECT\"] = PROJECT\n",
    "os.environ[\"REGION\"] = REGION\n",
    "os.environ[\"BUCKET\"] = PROJECT # DEFAULT BUCKET WILL BE PROJECT ID\n",
    "\n",
    "if PROJECT == \"your-gcp-project-here\":\n",
    "  print(\"Don't forget to update your PROJECT name! Currently:\", PROJECT)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 27,
   "metadata": {},
   "outputs": [],
   "source": [
    "# If you're not using TF 2.0+, let's enable eager execution\n",
    "if tf.version.VERSION < '2.0':\n",
    "    print('Enabling v2 behavior and eager execution; if necessary restart kernel, and rerun notebook')\n",
    "    tf.enable_v2_behavior()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Locating the CSV files\n",
    "\n",
    "We will start with the CSV files that we wrote out in the [first notebook](../01_explore/taxifare.iypnb) of this sequence. Just so you don't have to run the notebook, we saved a copy in ../data"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 13,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "-rw-r--r-- 1 jupyter jupyter 123590 Sep 12 16:33 ../data/taxi-test.csv\n",
      "-rw-r--r-- 1 jupyter jupyter 579055 Sep 12 16:33 ../data/taxi-train.csv\n",
      "-rw-r--r-- 1 jupyter jupyter 123114 Sep 12 16:33 ../data/taxi-valid.csv\n"
     ]
    }
   ],
   "source": [
    "!ls -l ../../data/*.csv"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Use tf.data to read the CSV files\n",
    "\n",
    "See the documentation for [make_csv_dataset](https://www.tensorflow.org/api_docs/python/tf/data/experimental/make_csv_dataset).\n",
    "If you have TFRecords (which is recommended), use [make_batched_features_dataset](https://www.tensorflow.org/api_docs/python/tf/data/experimental/make_batched_features_dataset) instead."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 21,
   "metadata": {},
   "outputs": [],
   "source": [
    "CSV_COLUMNS  = ['fare_amount',  'pickup_datetime',\n",
    "                'pickup_longitude', 'pickup_latitude', \n",
    "                'dropoff_longitude', 'dropoff_latitude', \n",
    "                'passenger_count', 'key']\n",
    "LABEL_COLUMN = 'fare_amount'\n",
    "DEFAULTS     = [[0.0],['na'],[0.0],[0.0],[0.0],[0.0],[0.0],['na']]"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 22,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "<PrefetchDataset shapes: OrderedDict([(fare_amount, (1,)), (pickup_datetime, (1,)), (pickup_longitude, (1,)), (pickup_latitude, (1,)), (dropoff_longitude, (1,)), (dropoff_latitude, (1,)), (passenger_count, (1,)), (key, (1,))]), types: OrderedDict([(fare_amount, tf.float32), (pickup_datetime, tf.string), (pickup_longitude, tf.float32), (pickup_latitude, tf.float32), (dropoff_longitude, tf.float32), (dropoff_latitude, tf.float32), (passenger_count, tf.float32), (key, tf.string)])>\n"
     ]
    }
   ],
   "source": [
    "# load the training data\n",
    "def load_dataset(pattern):\n",
    "  return tf.data.experimental.make_csv_dataset(pattern, 1, CSV_COLUMNS, DEFAULTS)\n",
    "\n",
    "tempds = load_dataset('../../data/taxi-train*')\n",
    "print(tempds)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Note that this is a prefetched dataset. If you loop over the dataset, you'll get the rows one-by-one. Let's convert each row into a Python dictionary:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 23,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "0 {'pickup_datetime': array([b'2012-08-10 12:17:02 UTC'], dtype=object), 'pickup_longitude': array([-73.98314], dtype=float32), 'pickup_latitude': array([40.741684], dtype=float32), 'key': array([b'460'], dtype=object), 'fare_amount': array([6.1], dtype=float32), 'dropoff_longitude': array([-73.995285], dtype=float32), 'passenger_count': array([1.], dtype=float32), 'dropoff_latitude': array([40.744015], dtype=float32)}\n",
      "1 {'pickup_datetime': array([b'2014-09-10 07:23:17 UTC'], dtype=object), 'pickup_longitude': array([-73.99419], dtype=float32), 'pickup_latitude': array([40.745983], dtype=float32), 'key': array([b'206'], dtype=object), 'fare_amount': array([5.], dtype=float32), 'dropoff_longitude': array([-73.98535], dtype=float32), 'passenger_count': array([1.], dtype=float32), 'dropoff_latitude': array([40.753426], dtype=float32)}\n",
      "2 {'pickup_datetime': array([b'2013-04-18 08:48:00 UTC'], dtype=object), 'pickup_longitude': array([-73.864914], dtype=float32), 'pickup_latitude': array([40.771378], dtype=float32), 'key': array([b'5677'], dtype=object), 'fare_amount': array([6.], dtype=float32), 'dropoff_longitude': array([-73.86421], dtype=float32), 'passenger_count': array([1.], dtype=float32), 'dropoff_latitude': array([40.77], dtype=float32)}\n",
      "3 {'pickup_datetime': array([b'2013-04-18 08:48:00 UTC'], dtype=object), 'pickup_longitude': array([-73.96325], dtype=float32), 'pickup_latitude': array([40.774113], dtype=float32), 'key': array([b'2940'], dtype=object), 'fare_amount': array([5.], dtype=float32), 'dropoff_longitude': array([-73.95772], dtype=float32), 'passenger_count': array([1.], dtype=float32), 'dropoff_latitude': array([40.779438], dtype=float32)}\n"
     ]
    }
   ],
   "source": [
    "# print a few of the rows\n",
    "for n, data in enumerate(tempds):\n",
    "    row_data = {k: v.numpy() for k,v in data.items()}\n",
    "    print(n, row_data)\n",
    "    if n > 2:\n",
    "        break"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "What we really need is a dictionary of features + a label. So, we have to do two things to the above dictionary. (1) remove the unwanted column \"key\" and (2) keep the label separate from the features."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 24,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "0 [6.1] {'pickup_longitude': array([-73.98314], dtype=float32), 'pickup_latitude': array([40.741684], dtype=float32), 'dropoff_longitude': array([-73.995285], dtype=float32), 'passenger_count': array([1.], dtype=float32), 'dropoff_latitude': array([40.744015], dtype=float32)}\n",
      "1 [5.] {'pickup_longitude': array([-73.99419], dtype=float32), 'pickup_latitude': array([40.745983], dtype=float32), 'dropoff_longitude': array([-73.98535], dtype=float32), 'passenger_count': array([1.], dtype=float32), 'dropoff_latitude': array([40.753426], dtype=float32)}\n",
      "2 [6.] {'pickup_longitude': array([-73.864914], dtype=float32), 'pickup_latitude': array([40.771378], dtype=float32), 'dropoff_longitude': array([-73.86421], dtype=float32), 'passenger_count': array([1.], dtype=float32), 'dropoff_latitude': array([40.77], dtype=float32)}\n",
      "3 [5.] {'pickup_longitude': array([-73.96325], dtype=float32), 'pickup_latitude': array([40.774113], dtype=float32), 'dropoff_longitude': array([-73.95772], dtype=float32), 'passenger_count': array([1.], dtype=float32), 'dropoff_latitude': array([40.779438], dtype=float32)}\n"
     ]
    }
   ],
   "source": [
    "# get features, label\n",
    "def features_and_labels(row_data):\n",
    "    for unwanted_col in ['pickup_datetime', 'key']:\n",
    "        row_data.pop(unwanted_col)\n",
    "    label = row_data.pop(LABEL_COLUMN)\n",
    "    return row_data, label  # features, label\n",
    "\n",
    "# print a few rows to make it sure works\n",
    "for n, data in enumerate(tempds):\n",
    "    row_data = {k: v.numpy() for k,v in data.items()}\n",
    "    features, label = features_and_labels(row_data)\n",
    "    print(n, label, features)\n",
    "    if n > 2:\n",
    "        break"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Batching\n",
    "\n",
    "Let's do both (loading, features_label)\n",
    "in our load_dataset function, and also add batching."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 25,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "[(OrderedDict([('pickup_longitude', <tf.Tensor: id=419, shape=(2,), dtype=float32, numpy=array([-73.976524, -73.999306], dtype=float32)>), ('pickup_latitude', <tf.Tensor: id=418, shape=(2,), dtype=float32, numpy=array([40.756996, 40.73008 ], dtype=float32)>), ('dropoff_longitude', <tf.Tensor: id=416, shape=(2,), dtype=float32, numpy=array([-73.94917, -73.98662], dtype=float32)>), ('dropoff_latitude', <tf.Tensor: id=415, shape=(2,), dtype=float32, numpy=array([40.777275, 40.728127], dtype=float32)>), ('passenger_count', <tf.Tensor: id=417, shape=(2,), dtype=float32, numpy=array([1., 1.], dtype=float32)>)]), <tf.Tensor: id=420, shape=(2,), dtype=float32, numpy=array([14. ,  5.3], dtype=float32)>), (OrderedDict([('pickup_longitude', <tf.Tensor: id=425, shape=(2,), dtype=float32, numpy=array([-73.97756, -73.97157], dtype=float32)>), ('pickup_latitude', <tf.Tensor: id=424, shape=(2,), dtype=float32, numpy=array([40.74649, 40.75433], dtype=float32)>), ('dropoff_longitude', <tf.Tensor: id=422, shape=(2,), dtype=float32, numpy=array([-73.99108, -73.98791], dtype=float32)>), ('dropoff_latitude', <tf.Tensor: id=421, shape=(2,), dtype=float32, numpy=array([40.765526, 40.77648 ], dtype=float32)>), ('passenger_count', <tf.Tensor: id=423, shape=(2,), dtype=float32, numpy=array([3., 1.], dtype=float32)>)]), <tf.Tensor: id=426, shape=(2,), dtype=float32, numpy=array([ 8.5, 10.5], dtype=float32)>), (OrderedDict([('pickup_longitude', <tf.Tensor: id=431, shape=(2,), dtype=float32, numpy=array([-73.96533 , -73.991264], dtype=float32)>), ('pickup_latitude', <tf.Tensor: id=430, shape=(2,), dtype=float32, numpy=array([40.769676, 40.74937 ], dtype=float32)>), ('dropoff_longitude', <tf.Tensor: id=428, shape=(2,), dtype=float32, numpy=array([-73.972534, -73.97786 ], dtype=float32)>), ('dropoff_latitude', <tf.Tensor: id=427, shape=(2,), dtype=float32, numpy=array([40.764854, 40.75075 ], dtype=float32)>), ('passenger_count', <tf.Tensor: id=429, shape=(2,), dtype=float32, numpy=array([1., 1.], dtype=float32)>)]), <tf.Tensor: id=432, shape=(2,), dtype=float32, numpy=array([5.3, 7. ], dtype=float32)>)]\n"
     ]
    }
   ],
   "source": [
    "def load_dataset(pattern, batch_size):\n",
    "  return (\n",
    "      tf.data.experimental.make_csv_dataset(pattern, batch_size, CSV_COLUMNS, DEFAULTS)\n",
    "             .map(features_and_labels) # features, label\n",
    "  )\n",
    "\n",
    "# try changing the batch size and watch what happens.\n",
    "tempds = load_dataset('../../data/taxi-train*', batch_size=2)\n",
    "print(list(tempds.take(3))) # truncate and print as a list "
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Shuffling\n",
    "\n",
    "When training a deep learning model in batches over multiple workers, it is helpful if we shuffle the data. That way, different workers will be working on different parts of the input file at the same time, and so averaging gradients across workers will help. Also, during training, we will need to read the data indefinitely."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 26,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "[(OrderedDict([('pickup_longitude', <tf.Tensor: id=552, shape=(2,), dtype=float32, numpy=array([-74.00672, -73.78672], dtype=float32)>), ('pickup_latitude', <tf.Tensor: id=551, shape=(2,), dtype=float32, numpy=array([40.741623, 40.641136], dtype=float32)>), ('dropoff_longitude', <tf.Tensor: id=549, shape=(2,), dtype=float32, numpy=array([-73.9633 , -73.78232], dtype=float32)>), ('dropoff_latitude', <tf.Tensor: id=548, shape=(2,), dtype=float32, numpy=array([40.762768, 40.648724], dtype=float32)>), ('passenger_count', <tf.Tensor: id=550, shape=(2,), dtype=float32, numpy=array([1., 1.], dtype=float32)>)]), <tf.Tensor: id=553, shape=(2,), dtype=float32, numpy=array([17.5,  7.7], dtype=float32)>)]\n",
      "[(OrderedDict([('pickup_longitude', <tf.Tensor: id=655, shape=(2,), dtype=float32, numpy=array([-73.98504, -73.99718], dtype=float32)>), ('pickup_latitude', <tf.Tensor: id=654, shape=(2,), dtype=float32, numpy=array([40.76108 , 40.714516], dtype=float32)>), ('dropoff_longitude', <tf.Tensor: id=652, shape=(2,), dtype=float32, numpy=array([-73.9978 , -73.99129], dtype=float32)>), ('dropoff_latitude', <tf.Tensor: id=651, shape=(2,), dtype=float32, numpy=array([40.765102, 40.75033 ], dtype=float32)>), ('passenger_count', <tf.Tensor: id=653, shape=(2,), dtype=float32, numpy=array([2., 1.], dtype=float32)>)]), <tf.Tensor: id=656, shape=(2,), dtype=float32, numpy=array([ 5.7, 10.5], dtype=float32)>)]\n"
     ]
    }
   ],
   "source": [
    "def load_dataset(pattern, batch_size=1, mode=tf.estimator.ModeKeys.EVAL):\n",
    "  dataset = (tf.data.experimental.make_csv_dataset(pattern, batch_size, CSV_COLUMNS, DEFAULTS)\n",
    "             .map(features_and_labels) # features, label\n",
    "             .cache())\n",
    "  if mode == tf.estimator.ModeKeys.TRAIN:\n",
    "        dataset = dataset.shuffle(1000).repeat()\n",
    "  dataset = dataset.prefetch(1) # take advantage of multi-threading; 1=AUTOTUNE\n",
    "  return dataset\n",
    "\n",
    "tempds = load_dataset('../../data/taxi-train*', 2, tf.estimator.ModeKeys.TRAIN)\n",
    "print(list(tempds.take(1)))\n",
    "tempds = load_dataset('../../data/taxi-valid*', 2, tf.estimator.ModeKeys.EVAL)\n",
    "print(list(tempds.take(1)))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "In the next notebook, we will build the model using this input pipeline."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Copyright 2022 Google Inc.\n",
    "Licensed under the Apache License, Version 2.0 (the \"License\"); you may not use this file except in compliance with the License. You may obtain a copy of the License at\n",
    "http://www.apache.org/licenses/LICENSE-2.0\n",
    "Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License."
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.5.3"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 4
}
